尽管建议尽可能利用库，但是通过观察如何使用“原生”DPC++内核实现每个模式，可以学到更多东西。\par

本章剩余部分的内核不会达到与高度调优库相同的性能水平，但是对于更好地理解DPC++的功能很有用——甚至可以作为新库的功能原型。\par

\begin{tcolorbox}[colback=blue!5!white,colframe=blue!75!black, title=使用供应商提供的库!]
当供应商提供函数库实现时，使用它而不是将函数重新实现总是有益的!
\end{tcolorbox}

\hspace*{\fill} \par %插入空行
\textbf{Map——映射模式}

由于其简单性，映射模式可以作为基本的并行内核直接实现。图14-13所示的代码显示了这样一个实现，使用映射模式计算范围内每个输入元素的平方根。\par

\hspace*{\fill} \par %插入空行
图14-13 数据并行内核中实现映射模式
\begin{lstlisting}[caption={}]
Q.parallel_for(N, [=](id<1> i) {
	output[i] = sqrt(input[i]);
}).wait();
\end{lstlisting}

\hspace*{\fill} \par %插入空行
\textbf{Stencil——模具模式}

如图14-14所示，将模具直接实现为多维缓冲区的数据并行内核，很简单，也很容易理解。\par

\hspace*{\fill} \par %插入空行
图14-14 数据并行内核中实现模具模式
\begin{lstlisting}[caption={}]
id<2> offset(1, 1);
h.parallel_for(stencil_range, offset, [=](id<2> idx) {
	int i = idx[0];
	int j = idx[1];
	
	float self = input[i][j];
	float north = input[i - 1][j];
	float east = input[i][j + 1];
	float south = input[i + 1][j];
	float west = input[i][j - 1];
	output[i][j] = (self + north + east + south + west) / 5.0f;
});
\end{lstlisting}

不过，这个模式的表达式非常简单，不能期望能执行得很好。正如本章前面提到的，需要使用局部性(通过空间或时间阻塞)来避免从内存中重复读取相同的数据。一个使用工作组本地内存的空间阻塞如图14-15所示。\par

\hspace*{\fill} \par %插入空行
图14-15 使用工作组本地内存在ND-Range内核中实现模具模式
\begin{lstlisting}[caption={}]
range<2> local_range(B, B);
// Includes boundary cells
range<2> tile_size = local_range + range<2>(2, 2);
auto tile = local_accessor<float, 2>(tile_size, h);

// Compute the average of each cell and its immediate neighbors
id<2> offset(1, 1);

h.parallel_for(
nd_range<2>(stencil_range, local_range, offset), [=](nd_item<2> it) {
	// Load this tile into work-group local memory
	id<2> lid = it.get_local_id();
	range<2> lrange = it.get_local_range();
	for (int ti = lid[0]; ti < B + 2; ti += lrange[0]) {
		int gi = ti + B * it.get_group(0);
		for (int tj = lid[1]; tj < B + 2; tj += lrange[1]) {
			int gj = tj + B * it.get_group(1);
			tile[ti][tj] = input[gi][gj];
		}
	}
	it.barrier(access::fence_space::local_space);
	
	// Compute the stencil using values from local memory
	int gi = it.get_global_id(0);
	int gj = it.get_global_id(1);
	
	int ti = it.get_local_id(0) + 1;
	int tj = it.get_local_id(1) + 1;
	
	float self = tile[ti][tj];
	float north = tile[ti - 1][tj];
	float east = tile[ti][tj + 1];
	float south = tile[ti + 1][tj];
	float west = tile[ti][tj - 1];
	output[gi][gj] = (self + north + east + south + west) / 5.0f;
});
\end{lstlisting}

要对给定的模具进行最佳优化，需要对块大小、邻域和模板函数本身进行压缩时检查，这其实很复杂，需要有更多讨论才能理解。。\par

\hspace*{\fill} \par %插入空行
\textbf{Reduction——归约模式}

通过利用提供工作项之间同步和通信功能(例如，原子操作、工作组和子工作组功能、子工作组混合)的特性，在DPC++中实现归约内核。图14-16和图14-17中的内核显示了两种可能的归约实现:使用parallel\_for和工作项的原子操作的简单归约，以及稍微一些简化，可以使用ND-Range paralle\_for和工作组reduce函数对局部性进行利用。我们将在第19章更详细地讨论这些原子操作。\par

\hspace*{\fill} \par %插入空行
图14-16 为数据并行内核的简单归约
\begin{lstlisting}[caption={}]
Q.parallel_for(N, [=](id<1> i) {
	atomic_ref<
	int,
	memory_order::relaxed,
	memory_scope::system,
	access::address_space::global_space>(*sum) += data[i];
}).wait();
\end{lstlisting}

\hspace*{\fill} \par %插入空行
图14-17 为ND-Range内核的简单归约
\begin{lstlisting}[caption={}]
Q.parallel_for(nd_range<1>{N, B}, [=](nd_item<1> it) {
	int i = it.get_global_id(0);
	int group_sum = reduce(it.get_group(), data[i], plus<>());
	if (it.get_local_id(0) == 0) {
		atomic_ref<
		int,
		memory_order::relaxed,
		memory_scope::system,
		access::address_space::global_space>(*sum) += group_sum;
	}
}).wait();
\end{lstlisting}

还有许多其他方法可以编写归约内核，由于对原子操作的硬件支持、工作组本地内存大小、全局内存大小、快速设备范围栅栏的可用性、甚至专用归约指令的可用性也不同，不同的设备可能更喜欢不同的实现。某些体系结构上，使用log2(N)单独的内核调用来执行树形归约可能会更快(或更必要)。\par

强烈建议只有在DPC++归约库不支持的情况下，或者在为特定设备的功能对内核进行微调时，才需要考虑手动实现归约——即使这样，也要在100\%确定归约库的性能不佳之后才考虑这样做!\par

\hspace*{\fill} \par %插入空行
\textbf{Scan——扫描模式}

实现并行扫描需要对数据进行多次扫描，每次扫描之间都要进行同步。由于DPC++没有提供同步ND-Range内所有工作项的机制，所以必须使用多个内核来实现设备范围内扫描的实现，这些内核通过全局内存传递结果。\par

图14-18、14-19和14-20所示的代码演示了使用多个内核实现的包含第一个元素扫描。第一个内核将输入值分布到多个工作组，在工作组本地内存中，计算工作组本地扫描(注意，可以使用工作组inclusive\_scan函数来代替)结果。第二个内核使用单个工作组计算本地扫描，对每个块的最终值进行计算。第三个内核结合这些中间结果来最后确定前缀和。这三个内核对应图14-5中的三个层。\par

\hspace*{\fill} \par %插入空行
图14-18 ND-Range内核中实现全局包含扫描的第1阶段:跨工作组计算
\begin{lstlisting}[caption={}]
// Phase 1: Compute local scans over input blocks
q.submit([&](handler& h) {
	auto local = local_accessor<int32_t, 1>(L, h);
	h.parallel_for(nd_range<1>(N, L), [=](nd_item<1> it) {
		int i = it.get_global_id(0);
		int li = it.get_local_id(0);
		
		// Copy input to local memory
		local[li] = input[i];
		it.barrier();
		
		// Perform inclusive scan in local memory
		for (int32_t d = 0; d <= log2((float)L) - 1; ++d) {
			uint32_t stride = (1 << d);
			int32_t update = (li >= stride) ? local[li - stride] : 0;
			it.barrier();
			local[li] += update;
			it.barrier();
		}
	
		// Write the result for each item to the output buffer
		// Write the last result from this block to the temporary buffer
		output[i] = local[li];
		if (li == it.get_local_range()[0] - 1)
			tmp[it.get_group(0)] = local[li];
	});
}).wait();
\end{lstlisting}

\hspace*{\fill} \par %插入空行
图14-19 ND-Range内核中实现全局包含扫描的第2阶段:扫描每个工作组的结果
\begin{lstlisting}[caption={}]
// Phase 2: Compute scan over partial results
q.submit([&](handler& h) {
	auto local = local_accessor<int32_t, 1>(G, h);
	h.parallel_for(nd_range<1>(G, G), [=](nd_item<1> it) {
		int i = it.get_global_id(0);
		int li = it.get_local_id(0);
		
		// Copy input to local memory
		local[li] = tmp[i];
		it.barrier();
		
		// Perform inclusive scan in local memory
		for (int32_t d = 0; d <= log2((float)G) - 1; ++d) {
			uint32_t stride = (1 << d);
			int32_t update = (li >= stride) ? local[li - stride] : 0;
			it.barrier();
			local[li] += update;
			it.barrier();
		}
	
		// Overwrite result from each work-item in the temporary buffer
		tmp[i] = local[li];
	});
}).wait();
\end{lstlisting}

\hspace*{\fill} \par %插入空行
图14-20 ND-Range内核中实现全局包含扫描的第三阶段(最终阶段)
\begin{lstlisting}[caption={}]
// Phase 3: Update local scans using partial results
q.parallel_for(nd_range<1>(N, L), [=](nd_item<1> it) {
	int g = it.get_group(0);
	if (g > 0) {
		int i = it.get_global_id(0);
		output[i] += tmp[g - 1];
	}
}).wait();
\end{lstlisting}

图14-18和14-19非常相似，唯一的区别是范围的大小，以及如何处理输入和输出值。该模式的实际实现可以使用带有不同参数的函数来实现这两个阶段，这里将它们作为不同的代码。\par

\hspace*{\fill} \par %插入空行
\textbf{打包和解包}

打包和解包也称为收集和分散操作。这些操作处理数据在内存中的排列方式，以及呈现给计算资源的方式上。\par

\hspace*{\fill} \par %插入空行
\textbf{打包}

由于打包依赖于排除第一个元素扫描，实现一个适用于ND-Range的所有元素的打包，必须通过全局内存和在几个内核排队的过程中进行。但是，对于包有一个通用的用例，不需要在ND-Range的所有元素上应用操作——仅在特定工作组或子工作组中的项上应用打包。\par

图14-21中的代码片段展示了如何在排除第一个元素的扫描中，实现组打包操作。\par

\hspace*{\fill} \par %插入空行
图14-21 排除第一个元素的扫描中实现组打包操作
\begin{lstlisting}[caption={}]
uint32_t index = exclusive_scan(g, (uint32_t) predicate, plus<>());
if (predicate)
	dst[index] = value;
\end{lstlisting}

图14-22中的代码演示了如何在内核中使用这样的包操作，来构建一个元素列表，这些元素需要一些额外的后处理(在将来的内核中)。所示的示例基于分子动力学模拟中的一个真实内核:分配给粒子i的子工作组中的工作项协作识别与i固定距离内的所有其他粒子，只有这个“相邻列表”中的粒子将被用于计算作用于每个粒子上的力。。\par

\hspace*{\fill} \par %插入空行
图14-22 使用工作组打包操作构建需要额外后处理的元素列表
\begin{lstlisting}[caption={}]
range<2> global(N, 8);
range<2> local(1, 8);
Q.parallel_for(
nd_range<2>(global, local),
[=](nd_item<2> it) [[cl::intel_reqd_sub_group_size(8)]] {
	int i = it.get_global_id(0);
	sub_group sg = it.get_sub_group();
	int sglid = sg.get_local_id()[0];
	int sgrange = sg.get_max_local_range()[0];
	
	uint32_t k = 0;
	for (int j = sglid; j < N; j += sgrange) {
		
		// Compute distance between i and neighbor j
		float r = distance(position[i], position[j]);
		
		// Pack neighbors that require post-processing into a list
		uint32_t pack = (i != j) and (r <= CUTOFF);
		uint32_t offset = exclusive_scan(sg, pack, plus<>());
		if (pack)
			neighbors[i * MAX_K + k + offset] = j;
		
		// Keep track of how many neighbors have been packed so far
		k += reduce(sg, pack, plus<>());
	}
	num_neighbors[i] = reduce(sg, k, maximum<>());
}).wait();
\end{lstlisting}

注意，打包模式不重新排序元素——打包到输出数组中的元素的顺序与输入数组中的顺序相同。打包的这个属性很重要，使我们能够使用打包功能来实现其他更抽象的并行算法(比如std::copy\_if和std::stable\_partition)。然而，在打包功能上还可以实现其他并行算法，这些算法不需要维护顺序(比如：std::partition)。\par

\hspace*{\fill} \par %插入空行
\textbf{解包}

和打包一样，可以使用scan实现解包。在排除第一个元素扫描的基础上实现子工作组解包操作，如图14-23所示。\par

\hspace*{\fill} \par %插入空行
图14-23 排除第一个元素扫描的基础上实现子工作组解包操作
\begin{lstlisting}[caption={}]
uint32_t index = exclusive_scan(sg, (uint32_t) predicate, plus<>());
return (predicate) ? new_value[index] : original_value;
\end{lstlisting}

图14-24中的代码展示了如何使用这样的子工作组解包，来改平衡有不同控制流的内核(本例中，是计算Mandelbrot集)。每个工作项分配了单独的像素来计算和迭代，直到收敛或达到最大迭代次数。然后使用解包操作将已完成的像素替换为新像素。\par

\hspace*{\fill} \par %插入空行
图14-24 使用子工作组解包操作来平衡不同控制流的内核
\begin{lstlisting}[caption={}]
// Keep iterating as long as one work-item has work to do
while (any_of(sg, i < Nx)) {
	uint32_t converged =
		next_iteration(params, i, j, count, cr, ci, zr, zi, mandelbrot);
	if (any_of(sg, converged)) {
		3
		// Replace pixels that have converged using an unpack
		// Pixels that haven't converged are not replaced
		uint32_t index = exclusive_scan(sg, converged, plus<>());
		i = (converged) ? iq + index : i;
		iq += reduce(sg, converged, plus<>());
		
		// Reset the iterator variables for the new i
		if (converged)
		reset(params, i, j, count, cr, ci, zr, zi);
	}
}
\end{lstlisting}

这种方法提高效率(并减少执行时间)的程度高度依赖于应用程序和输入，因为检查完成和执行解包操作都会带来开销!因此，在实际应用程序中成功地使用此模式，需要一些基于发散量和正在执行的计算的微调(例如，引入启发式方法，仅在活动工作项的数量低于某个阈值时执行解包操作)。\par






















